import gzip
from typing import List, Optional, Literal, Any

import numpy as np
import polars as pl
from numba import njit, from_dtype
from numba.experimental import jitclass
from numpy.typing import NDArray

from .. import FuseMarketDepth
from ..validation import correct_event_order, validate_event_order, correct_local_timestamp
from ...types import (
    DEPTH_EVENT,
    DEPTH_CLEAR_EVENT,
    DEPTH_SNAPSHOT_EVENT,
    TRADE_EVENT,
    BUY_EVENT,
    SELL_EVENT,
    event_dtype, DEPTH_BBO_EVENT, EVENT_ARRAY
)

trade_schema = {
    'exchange': pl.String,
    'symbol': pl.String,
    'timestamp': pl.Int64,
    'local_timestamp': pl.Int64,
    'id': pl.String,
    'side': pl.String,
    'price': pl.Float64,
    'amount': pl.Float64,
}

depth_schema = {
    'exchange': pl.String,
    'symbol': pl.String,
    'timestamp': pl.Int64,
    'local_timestamp': pl.Int64,
    'is_snapshot': pl.Boolean,
    'side': pl.String,
    'price': pl.Float64,
    'amount': pl.Float64,
}

book_ticker_schema = {
    'exchange': pl.String,
    'symbol': pl.String,
    'timestamp': pl.Int64,
    'local_timestamp': pl.Int64,
    'ask_amount': pl.Float64,
    'ask_price': pl.Float64,
    'bid_price': pl.Float64,
    'bid_amount': pl.Float64,
}


def convert(
        input_files: List[str],
        output_filename: Optional[str] = None,
        buffer_size: int = 100_000_000,
        ss_buffer_size: int = 1_000_000,
        base_latency: float = 0,
        snapshot_mode: Literal['process', 'ignore_sod', 'ignore'] = 'process',
) -> NDArray:
    r"""
    Converts Tardis.dev data files into a format compatible with HftBacktest.

    For Tardis's Binance Futures feed data, they use the 'E' event timestamp, representing the sending time, rather
    than the 'T' transaction time, indicating when the matching occurs. So the latency is slightly less than it actually
    is.

    If you encounter an ``IndexError`` due to an out-of-bounds, try increasing the ``buffer_size`` and
    ``ss_buffer_size``.

    Args:
        input_files: Input filenames for both incremental book and trades files.
                     Trade files should be input before depth files. If a depth update generated by a trade is handled
                     first, the queue position is reduced twice—once by the depth message and again by the subsequent
                     trade message. Because the depth message already subtracts the traded quantity, the associated
                     trade message must be processed first to prevent this double-counting.
                     e.g. ['trades.csv.gz', 'incremental_book.csv.gz'].
        output_filename: If provided, the converted data will be saved to the specified filename in ``npz`` format.
        buffer_size: Sets a preallocated row size for the buffer.
        ss_buffer_size: Sets a preallocated row size for the snapshot.
        base_latency: The value to be added to the feed latency.
                      See :func:`.correct_local_timestamp`.
        snapshot_mode: - If this is set to 'ignore', all snapshots are ignored. The order book will converge to a
                         complete order book over time.
                       - If this is set to 'ignore_sod', the SOD (Start of Day) snapshot is ignored.
                         Since Tardis intentionally adds the SOD snapshot, not due to a message ID gap or disconnection,
                         there might not be a need to process SOD snapshot to build a complete order book.
                         Please see https://docs.tardis.dev/historical-data-details#collected-order-book-data-details
                         for more details.
                       - Otherwise, all snapshot events will be processed.
    Returns:
        Converted data compatible with HftBacktest.
    """
    tmp = np.empty(buffer_size, event_dtype)
    ss_bid = np.empty(ss_buffer_size, event_dtype)
    ss_ask = np.empty(ss_buffer_size, event_dtype)

    row_num = 0

    for file in input_files:
        print('Reading %s' % file)

        schema = None
        if 'trades' in file:
            schema = trade_schema
        elif 'incremental_book_L2' in file:
            schema = depth_schema
        elif 'book_ticker' in file:
            schema = book_ticker_schema
        else:
            # Attempts to infer the file type using its header.
            try:
                if file.endswith('.gz'):
                    with gzip.open(file) as f:
                        line = f.readline()
                        header = line.decode().strip().split(',')
                else:
                    with open(file) as f:
                        line = f.readline()
                        header = line.strip().split(',')
                if header == list(trade_schema.keys()):
                    schema = trade_schema
                elif header == list(depth_schema.keys()):
                    schema = depth_schema
                elif header == list(book_ticker_schema.keys()):
                    schema = book_ticker_schema
            except:
                # Fails to infer the file type; let Polars infer the schema.
                pass

        df = pl.read_csv(file, schema=schema)
        if df.columns == list(trade_schema.keys()):
            arr = (
                df.with_columns(
                    pl.when(pl.col('side') == 'buy')
                        .then(BUY_EVENT | TRADE_EVENT)
                        .when(pl.col('side') == 'sell')
                        .then(SELL_EVENT | TRADE_EVENT)
                        .otherwise(TRADE_EVENT)
                        .cast(pl.UInt64, strict=True)
                        .alias('ev'),
                    (pl.col('timestamp') * 1000)
                        .cast(pl.Int64, strict=True)
                        .alias('exch_ts'),
                    (pl.col('local_timestamp') * 1000)
                        .cast(pl.Int64, strict=True)
                        .alias('local_ts'),
                    pl.col('price')
                        .cast(pl.Float64, strict=True)
                        .alias('px'),
                    pl.col('amount')
                        .cast(pl.Float64, strict=True)
                        .alias('qty'),
                    pl.lit(0)
                        .cast(pl.UInt64, strict=True)
                        .alias('order_id'),
                    pl.lit(0)
                        .cast(pl.Int64, strict=True)
                        .alias('ival'),
                    pl.lit(0.0)
                        .cast(pl.Float64, strict=True)
                        .alias('fval')
                )
                .select(['ev', 'exch_ts', 'local_ts', 'px', 'qty', 'order_id', 'ival', 'fval'])
                .to_numpy(structured=True)
            )
            tmp[row_num:row_num + len(arr)] = arr[:]
            row_num += len(arr)
        elif df.columns == list(depth_schema.keys()):
            arr = (
                df.with_columns(
                    (pl.col('timestamp') * 1000)
                        .cast(pl.Int64, strict=True)
                        .alias('exch_ts'),
                    (pl.col('local_timestamp') * 1000)
                        .cast(pl.Int64, strict=True)
                        .alias('local_ts'),
                    pl.col('price')
                        .cast(pl.Float64, strict=True)
                        .alias('px'),
                    pl.col('amount')
                        .cast(pl.Float64, strict=True)
                        .alias('qty'),
                    pl.when((pl.col('side') == 'bid') | (pl.col('side') == 'buy'))
                        .then(1)
                        .when((pl.col('side') == 'ask') | (pl.col('side') == 'sell'))
                        .then(-1)
                        .otherwise(0)
                        .cast(pl.Int8, strict=True)
                        .alias('side'),
                    pl.when(pl.col('is_snapshot'))
                        .then(1)
                        .otherwise(0)
                        .cast(pl.Int8, strict=True)
                        .alias('is_snapshot')
                )
                .select(['exch_ts', 'local_ts', 'px', 'qty', 'side', 'is_snapshot'])
                .to_numpy(structured=True)
            )

            snapshot_mode_flag = 0
            if snapshot_mode == 'ignore':
                snapshot_mode_flag = SNAPSHOT_MODE_IGNORE
            elif snapshot_mode == 'ignore_sod':
                snapshot_mode_flag = SNAPSHOT_MODE_IGNORE_SOD
            row_num = _convert_depth(tmp, arr, row_num, ss_bid, ss_ask, snapshot_mode_flag)
        elif df.columns == list(book_ticker_schema.keys()):
            raise ValueError('Use `convert_fuse` instead of `convert` to combine book ticker data with the depth data.')

    tmp = tmp[:row_num]

    print('Correcting the latency')
    tmp = correct_local_timestamp(tmp, base_latency)

    print('Correcting the event order')
    data = correct_event_order(
        tmp,
        np.argsort(tmp['exch_ts'], kind='mergesort'),
        np.argsort(tmp['local_ts'], kind='mergesort')
    )

    validate_event_order(data)

    if output_filename is not None:
        print('Saving to %s' % output_filename)
        np.savez_compressed(output_filename, data=data)

    return data


SNAPSHOT_MODE_IGNORE = 1
SNAPSHOT_MODE_IGNORE_SOD = 2


@njit(cache=True)
def _convert_depth(out, inp, row_num, ss_bid, ss_ask, snapshot_mode):
    ss_bid_rn = 0
    ss_ask_rn = 0
    is_sod_snapshot = True
    is_snapshot = False
    for rn in range(len(inp)):
        row = inp[rn]

        # ALL FIXES HERE: use dictionary-style access ['field'] instead of .field
        if row['is_snapshot'] == 1:
            if (
                (snapshot_mode == SNAPSHOT_MODE_IGNORE)
                or (snapshot_mode == SNAPSHOT_MODE_IGNORE_SOD and is_sod_snapshot)
            ):
                continue
            if not is_snapshot:
                is_snapshot = True
                ss_bid_rn = 0
                ss_ask_rn = 0
            if row['side'] == 1:
                ss_bid[ss_bid_rn].ev = DEPTH_SNAPSHOT_EVENT | BUY_EVENT
                ss_bid[ss_bid_rn].exch_ts = row['exch_ts']
                ss_bid[ss_bid_rn].local_ts = row['local_ts']
                ss_bid[ss_bid_rn].px = row['px']
                ss_bid[ss_bid_rn].qty = row['qty']
                ss_bid[ss_bid_rn].order_id = 0
                ss_bid[ss_bid_rn].ival = 0
                ss_bid[ss_bid_rn].fval = 0
                ss_bid_rn += 1
            else:
                ss_ask[ss_ask_rn].ev = DEPTH_SNAPSHOT_EVENT | SELL_EVENT
                ss_ask[ss_ask_rn].exch_ts = row['exch_ts']
                ss_ask[ss_ask_rn].local_ts = row['local_ts']
                ss_ask[ss_ask_rn].px = row['px']
                ss_ask[ss_ask_rn].qty = row['qty']
                ss_ask[ss_ask_rn].order_id = 0
                ss_ask[ss_ask_rn].ival = 0
                ss_ask[ss_ask_rn].fval = 0
                ss_ask_rn += 1
        else:
            is_sod_snapshot = False
            if is_snapshot:
                is_snapshot = False

                ss_bid = ss_bid[:ss_bid_rn]
                if len(ss_bid) > 0:
                    out[row_num].ev = DEPTH_CLEAR_EVENT | BUY_EVENT
                    out[row_num].exch_ts = ss_bid[0].exch_ts
                    out[row_num].local_ts = ss_bid[0].local_ts
                    out[row_num].px = ss_bid[-1].px
                    out[row_num].qty = 0
                    out[row_num].order_id = 0
                    out[row_num].ival = 0
                    out[row_num].fval = 0
                    row_num += 1
                    out[row_num:row_num + len(ss_bid)] = ss_bid[:]
                    row_num += len(ss_bid)
                ss_bid_rn = 0

                ss_ask = ss_ask[:ss_ask_rn]
                if len(ss_ask) > 0:
                    out[row_num].ev = DEPTH_CLEAR_EVENT | SELL_EVENT
                    out[row_num].exch_ts = ss_ask[0].exch_ts
                    out[row_num].local_ts = ss_ask[0].local_ts
                    out[row_num].px = ss_ask[-1].px
                    out[row_num].qty = 0
                    out[row_num].order_id = 0
                    out[row_num].ival = 0
                    out[row_num].fval = 0
                    row_num += 1
                    out[row_num:row_num + len(ss_ask)] = ss_ask[:]
                    row_num += len(ss_ask)
                ss_ask_rn = 0

            # Regular depth update
            out[row_num].ev = DEPTH_EVENT | (BUY_EVENT if row['side'] == 1 else SELL_EVENT)
            out[row_num].exch_ts = row['exch_ts']
            out[row_num].local_ts = row['local_ts']
            out[row_num].px = row['px']
            out[row_num].qty = row['qty']
            out[row_num].order_id = 0
            out[row_num].ival = 0
            out[row_num].fval = 0
            row_num += 1
    return row_num

@jitclass
class _Fuse:
    depth: FuseMarketDepth.class_type.instance_type
    ev: from_dtype(event_dtype)[:]
    ss_bid: from_dtype(event_dtype)[:]
    ss_ask: from_dtype(event_dtype)[:]

    def __init__(self, tick_size: float, lot_size: float, ss_buffer_size: int):
        self.depth = FuseMarketDepth(tick_size, lot_size)
        self.ev = np.zeros(1, event_dtype)
        self.ss_bid = np.zeros(ss_buffer_size, event_dtype)
        self.ss_ask = np.zeros(ss_buffer_size, event_dtype)

    def close(self) -> None:
        self.depth.close()

    def process_depth(self, inp: NDArray, rn: int, snapshot_mode: int) -> int:
        ss_bid_rn = 0
        ss_ask_rn = 0
        is_sod_snapshot = True
        is_snapshot = False
        for rn in range(rn, len(inp)):
            row = inp[rn]
            if row.is_snapshot == 1:
                # Prepare to insert DEPTH_SNAPSHOT_EVENT
                if not is_snapshot:
                    is_snapshot = True
                    ss_bid_rn = 0
                    ss_ask_rn = 0
                if row.side == 1:
                    self.ss_bid[ss_bid_rn].ev = DEPTH_SNAPSHOT_EVENT | BUY_EVENT
                    self.ss_bid[ss_bid_rn].exch_ts = row.exch_ts
                    self.ss_bid[ss_bid_rn].local_ts = row.local_ts
                    self.ss_bid[ss_bid_rn].px = row.px
                    self.ss_bid[ss_bid_rn].qty = row.qty
                    ss_bid_rn += 1
                else:
                    self.ss_ask[ss_ask_rn].ev = DEPTH_SNAPSHOT_EVENT | SELL_EVENT
                    self.ss_ask[ss_ask_rn].exch_ts = row.exch_ts
                    self.ss_ask[ss_ask_rn].local_ts = row.local_ts
                    self.ss_ask[ss_ask_rn].px = row.px
                    self.ss_ask[ss_ask_rn].qty = row.qty
                    ss_ask_rn += 1
            else:
                add = not (
                        (snapshot_mode == SNAPSHOT_MODE_IGNORE)
                        or (snapshot_mode == SNAPSHOT_MODE_IGNORE_SOD and is_sod_snapshot)
                )

                is_sod_snapshot = False
                if is_snapshot:
                    # End of the snapshot.
                    is_snapshot = False

                    # Add DEPTH_CLEAR_EVENT before refreshing the market depth by the snapshot.
                    if ss_bid_rn > 0:
                        # Clear the bid market depth within the snapshot bid range.
                        self.ev[0].ev = DEPTH_CLEAR_EVENT | BUY_EVENT
                        self.ev[0].exch_ts = self.ss_bid[0].exch_ts
                        self.ev[0].local_ts = self.ss_bid[0].local_ts
                        self.ev[0].px = self.ss_bid[ss_bid_rn - 1].px
                        self.ev[0].qty = 0
                        self.depth.process_event(self.ev, 0, add)
                        # Add DEPTH_SNAPSHOT_EVENT for the bid snapshot
                        for srn in range(ss_bid_rn):
                            self.depth.process_event(self.ss_bid, srn, add)

                    if ss_ask_rn > 0:
                        # Clear the ask market depth within the snapshot ask range.
                        self.ev[0].ev = DEPTH_CLEAR_EVENT | SELL_EVENT
                        self.ev[0].exch_ts = self.ss_ask[0].exch_ts
                        self.ev[0].local_ts = self.ss_ask[0].local_ts
                        self.ev[0].px = self.ss_ask[ss_ask_rn - 1].px
                        self.ev[0].qty = 0
                        self.depth.process_event(self.ev, 0, add)
                        # Add DEPTH_SNAPSHOT_EVENT for the ask snapshot
                        for srn in range(ss_ask_rn):
                            self.depth.process_event(self.ss_ask, srn, add)

                    return rn - 1
                else:
                    # Insert DEPTH_EVENT
                    self.ev[0].ev = DEPTH_EVENT | (BUY_EVENT if row.side == 1 else SELL_EVENT)
                    self.ev[0].exch_ts = row.exch_ts
                    self.ev[0].local_ts = row.local_ts
                    self.ev[0].px = row.px
                    self.ev[0].qty = row.qty
                    self.depth.process_event(self.ev, 0, True)
                    break
        return rn

    def process_bbo(self, inp: NDArray, rn: int) -> None:
        row = inp[rn]

        self.ev[0].ev = DEPTH_BBO_EVENT | SELL_EVENT
        self.ev[0].exch_ts = row.exch_ts
        self.ev[0].local_ts = row.local_ts
        self.ev[0].px = row.ask_price
        self.ev[0].qty = row.ask_amount
        self.depth.process_event(self.ev, 0, True)

        self.ev[0].ev = DEPTH_BBO_EVENT | BUY_EVENT
        self.ev[0].exch_ts = row.exch_ts
        self.ev[0].local_ts = row.local_ts
        self.ev[0].px = row.bid_price
        self.ev[0].qty = row.bid_amount
        self.depth.process_event(self.ev, 0, True)

    def process(
            self,
            depth_arr: NDArray,
            book_ticker_arr: NDArray,
            snapshot_mode: int
    ) -> None:
        ticker_rn = 0
        depth_rn = 0
        while True:
            if ticker_rn < len(book_ticker_arr):
                ticker_ts = book_ticker_arr[ticker_rn].local_ts
            else:
                ticker_ts = 0
            if depth_rn < len(depth_arr):
                depth_ts = depth_arr[depth_rn].local_ts
            else:
                depth_ts = 0

            if ticker_ts > 0 and depth_ts > 0:
                if ticker_ts < depth_ts:
                    self.process_bbo(book_ticker_arr, ticker_rn)
                    ticker_rn += 1
                else:
                    depth_rn = self.process_depth(depth_arr, depth_rn, snapshot_mode)
                    depth_rn += 1
            elif ticker_ts > 0:
                self.process_bbo(book_ticker_arr, ticker_rn)
                ticker_rn += 1
            elif depth_ts > 0:
                depth_rn = self.process_depth(depth_arr, depth_rn, snapshot_mode)
                depth_rn += 1
            else:
                break

    @property
    def fused_events(self) -> EVENT_ARRAY:
        return self.depth.fused_events


def convert_fuse(
        trades_filename: str,
        depth_filename: str,
        book_ticker_filename: str,
        tick_size: float,
        lot_size: float,
        output_filename: Optional[str] = None,
        ss_buffer_size: int = 1_000_000,
        base_latency: float = 0,
        snapshot_mode: Literal['process', 'ignore_sod', 'ignore'] = 'process',
) -> NDArray:
    r"""
    Converts Tardis.dev data files into a format compatible with HftBacktest.

    For Tardis's Binance Futures feed data, they use the 'E' event timestamp, representing the sending time, rather
    than the 'T' transaction time, indicating when the matching occurs. So the latency is slightly less than it actually
    is.

    If you encounter an ``IndexError`` due to an out-of-bounds, try increasing the ``ss_buffer_size``.

    Args:
        trades_filename: Input filenames for a trades file.
        depth_filename: Input filenames for an incremental book file.
        book_ticker_filename: Input filenames for a book ticker file.
        tick_size: tick size.
        lot_size: lot_size.
        output_filename: If provided, the converted data will be saved to the specified filename in ``npz`` format.
        ss_buffer_size: Sets a preallocated row size for the snapshot.
        base_latency: The value to be added to the feed latency.
                      See :func:`.correct_local_timestamp`.
        snapshot_mode: - If this is set to 'ignore', all snapshots are ignored. The order book will converge to a
                         complete order book over time.
                       - If this is set to 'ignore_sod', the SOD (Start of Day) snapshot is ignored.
                         Since Tardis intentionally adds the SOD snapshot, not due to a message ID gap or disconnection,
                         there might not be a need to process SOD snapshot to build a complete order book.
                         Please see https://docs.tardis.dev/historical-data-details#collected-order-book-data-details
                         for more details.
                       - Otherwise, all snapshot events will be processed.
    Returns:
        Converted data compatible with HftBacktest.
    """
    df = pl.read_csv(trades_filename, schema=trade_schema)
    if df.columns != list(trade_schema.keys()):
        raise KeyError
    trades_arr = (
        df.with_columns(
            pl.when(pl.col('side') == 'buy')
            .then(BUY_EVENT | TRADE_EVENT)
            .when(pl.col('side') == 'sell')
            .then(SELL_EVENT | TRADE_EVENT)
            .otherwise(TRADE_EVENT)
            .cast(pl.UInt64, strict=True)
            .alias('ev'),
            (pl.col('timestamp') * 1000)
            .cast(pl.Int64, strict=True)
            .alias('exch_ts'),
            (pl.col('local_timestamp') * 1000)
            .cast(pl.Int64, strict=True)
            .alias('local_ts'),
            pl.col('price')
            .cast(pl.Float64, strict=True)
            .alias('px'),
            pl.col('amount')
            .cast(pl.Float64, strict=True)
            .alias('qty'),
            pl.lit(0)
            .cast(pl.UInt64, strict=True)
            .alias('order_id'),
            pl.lit(0)
            .cast(pl.Int64, strict=True)
            .alias('ival'),
            pl.lit(0.0)
            .cast(pl.Float64, strict=True)
            .alias('fval')
        )
        .select(['ev', 'exch_ts', 'local_ts', 'px', 'qty', 'order_id', 'ival', 'fval'])
        .to_numpy(structured=True)
    )

    df = pl.read_csv(book_ticker_filename, schema=book_ticker_schema)
    if df.columns != list(book_ticker_schema.keys()):
        raise KeyError
    ticker_arr = (
        df.with_columns(
            (pl.col('timestamp') * 1000)
            .cast(pl.Int64, strict=True)
            .alias('exch_ts'),
            (pl.col('local_timestamp') * 1000)
            .cast(pl.Int64, strict=True)
            .alias('local_ts'),
            pl.col('ask_amount')
            .cast(pl.Float64, strict=True),
            pl.col('ask_price')
            .cast(pl.Float64, strict=True),
            pl.col('bid_price')
            .cast(pl.Float64, strict=True),
            pl.col('bid_amount')
            .cast(pl.Float64, strict=True),
            )
        .select(['exch_ts', 'local_ts', 'ask_amount', 'ask_price', 'bid_price', 'bid_amount'])
        .to_numpy(structured=True)
    )

    df = pl.read_csv(depth_filename, schema=depth_schema)
    if df.columns != list(depth_schema.keys()):
        raise KeyError
    depth_arr = (
        df.with_columns(
            (pl.col('timestamp') * 1000)
            .cast(pl.Int64, strict=True)
            .alias('exch_ts'),
            (pl.col('local_timestamp') * 1000)
            .cast(pl.Int64, strict=True)
            .alias('local_ts'),
            pl.col('price')
            .cast(pl.Float64, strict=True)
            .alias('px'),
            pl.col('amount')
            .cast(pl.Float64, strict=True)
            .alias('qty'),
            pl.when((pl.col('side') == 'bid') | (pl.col('side') == 'buy'))
            .then(1)
            .when((pl.col('side') == 'ask') | (pl.col('side') == 'sell'))
            .then(-1)
            .otherwise(0)
            .cast(pl.Int8, strict=True)
            .alias('side'),
            pl.when(pl.col('is_snapshot'))
            .then(1)
            .otherwise(0)
            .cast(pl.Int8, strict=True)
            .alias('is_snapshot')
        )
        .select(['exch_ts', 'local_ts', 'px', 'qty', 'side', 'is_snapshot'])
        .to_numpy(structured=True)
    )

    snapshot_mode_flag = 0
    if snapshot_mode == 'ignore':
        snapshot_mode_flag = SNAPSHOT_MODE_IGNORE
    elif snapshot_mode == 'ignore_sod':
        snapshot_mode_flag = SNAPSHOT_MODE_IGNORE_SOD

    fuse = _Fuse(tick_size, lot_size, ss_buffer_size)
    fuse.process(depth_arr, ticker_arr, snapshot_mode_flag)

    tmp = np.empty(len(trades_arr) + len(fuse.fused_events), event_dtype)
    tmp[:len(trades_arr)] = trades_arr
    tmp[len(trades_arr):] = fuse.fused_events

    fuse.close()

    print('Correcting the latency')
    tmp = correct_local_timestamp(tmp, base_latency)

    print('Correcting the event order')
    data = correct_event_order(
        tmp,
        np.argsort(tmp['exch_ts'], kind='mergesort'),
        np.argsort(tmp['local_ts'], kind='mergesort')
    )

    validate_event_order(data)

    if output_filename is not None:
        print('Saving to %s' % output_filename)
        np.savez_compressed(output_filename, data=data)

    return data
